package cn.fzkj.config;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttConsumerCallBack implements MqttCallback {
    /**
     * 客户端断开连接的回调
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("与服务器断开连接，可重连");
    }

    /**
     * 消息到达的回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主题 : %s",topic));
        System.out.println(String.format("接收消息Qos : %d",message.getQos()));
//        System.out.println(String.format("接收消息内容 : %s", new String(message.getPayload())));
//        byte[] payload = message.getPayload();
//        if (payload.length == 41){
//            byte[] header = subByte(payload, 0, 4);
//            System.out.println("header : " + convertByteToInteger(header));
//            byte[] type = subByte(payload, 4, 1);
//            System.out.println("type : " + convertByteToInteger(type));
//            byte[] length = subByte(payload, 5, 2);
//            System.out.println("length : " + convertByteToInteger(length));
//        }
//        if (payload.length == 116){
//
//        }
//        try {
//            System.out.println("消息内容：" + bytesToHexString(payload));
//        }catch (Exception e){
//            System.out.println(e.getMessage());
//        }
//        System.out.println(String.format("接收消息内容 : %s", payload));
        System.out.println(String.format("接收消息retained : %b",message.isRetained()));
    }

    public String bytesToHexString(byte[] bArray) {
        StringBuffer sb = new StringBuffer(bArray.length);
        String sTemp;
        for (int i = 0; i < bArray.length; i++) {
            sTemp = Integer.toHexString(0xFF & bArray[i]);
            if (sTemp.length() < 2)
                sb.append(0);
            sb.append(sTemp + ",");
        }
        return sb.toString();
    }

    public byte[] subByte(byte[] b,int off,int length){
        byte[] b1 = new byte[length];
        System.arraycopy(b, off, b1, 0, length);
        return b1;
    }

    /**
     * 消息发布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println(String.format("接收消息成功"));
    }
}
